#include "source/extensions/filters/network/zookeeper_proxy/decoder.h"

#include <string>

#include "source/common/common/enum_to_int.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace ZooKeeperProxy {

constexpr uint32_t BOOL_LENGTH = 1;
constexpr uint32_t INT_LENGTH = 4;
constexpr uint32_t LONG_LENGTH = 8;
constexpr uint32_t XID_LENGTH = 4;
constexpr uint32_t OPCODE_LENGTH = 4;
constexpr uint32_t ZXID_LENGTH = 8;
constexpr uint32_t TIMEOUT_LENGTH = 4;
constexpr uint32_t SESSION_LENGTH = 8;
constexpr uint32_t MULTI_HEADER_LENGTH = 9;
constexpr uint32_t PROTOCOL_VERSION_LENGTH = 4;
constexpr uint32_t SERVER_HEADER_LENGTH = 16;

const char* createFlagsToString(CreateFlags flags) {
  switch (flags) {
  case CreateFlags::Persistent:
    return "persistent";
  case CreateFlags::PersistentSequential:
    return "persistent_sequential";
  case CreateFlags::Ephemeral:
    return "ephemeral";
  case CreateFlags::EphemeralSequential:
    return "ephemeral_sequential";
  case CreateFlags::Container:
    return "container";
  case CreateFlags::PersistentWithTtl:
    return "persistent_with_ttl";
  case CreateFlags::PersistentSequentialWithTtl:
    return "persistent_sequential_with_ttl";
  }

  return "unknown";
}

absl::StatusOr<absl::optional<OpCodes>> DecoderImpl::decodeOnData(Buffer::Instance& data,
                                                                  uint64_t& offset) {
  ENVOY_LOG(trace, "zookeeper_proxy: decoding request with {} bytes at offset {}", data.length(),
            offset);

  // Check message length.
  const absl::StatusOr<int32_t> len = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      len, absl::nullopt, fmt::format("peekInt32 for len: {}", len.status().message()));

  ENVOY_LOG(trace, "zookeeper_proxy: decoding request with len {} at offset {}", len.value(),
            offset);

  absl::Status status = ensureMinLength(len.value(), XID_LENGTH + INT_LENGTH); // xid + opcode
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      status, absl::nullopt, fmt::format("ensureMinLength: {}", status.message()));

  status = ensureMaxLength(len.value());
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      status, absl::nullopt, fmt::format("ensureMaxLength: {}", status.message()));

  auto start_time = time_source_.monotonicTime();

  // Control requests, with XIDs <= 0.
  //
  // These are meant to control the state of a session:
  // connect, keep-alive, authenticate and set initial watches.
  //
  // Note: setWatches is a command historically used to set watches
  //       right after connecting, typically used when roaming from one
  //       ZooKeeper server to the next. Thus, the special xid.
  //       However, some client implementations might expose setWatches
  //       as a regular data request, so we support that as well.
  const absl::StatusOr<int32_t> xid = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      xid, absl::nullopt, fmt::format("peerInt32 for xid: {}", xid.status().message()));

  ENVOY_LOG(trace, "zookeeper_proxy: decoding request with xid {} at offset {}", xid.value(),
            offset);

  switch (static_cast<XidCodes>(xid.value())) {
  case XidCodes::ConnectXid:
    status = parseConnect(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("parseConnect: {}", status.message()));

    control_requests_by_xid_[xid.value()].push({OpCodes::Connect, std::move(start_time)});
    return OpCodes::Connect;
  case XidCodes::PingXid:
    offset += OPCODE_LENGTH;
    callbacks_.onPing();
    control_requests_by_xid_[xid.value()].push({OpCodes::Ping, std::move(start_time)});
    return OpCodes::Ping;
  case XidCodes::AuthXid:
    status = parseAuthRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("parseAuthRequest: {}", status.message()));

    control_requests_by_xid_[xid.value()].push({OpCodes::SetAuth, std::move(start_time)});
    return OpCodes::SetAuth;
  case XidCodes::SetWatchesXid:
    offset += OPCODE_LENGTH;
    status = parseSetWatchesRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseSetWatchesRequest: {}", status.message()));

    control_requests_by_xid_[xid.value()].push({OpCodes::SetWatches, std::move(start_time)});
    return OpCodes::SetWatches;
  default:
    // WATCH_XID is generated by the server, so that and everything
    // else can be ignored here.
    break;
  }

  // Data requests, with XIDs > 0.
  //
  // These are meant to happen after a successful control request, except
  // for two cases: auth requests can happen at any time and ping requests
  // must happen every 1/3 of the negotiated session timeout, to keep
  // the session alive.
  const absl::StatusOr<int32_t> oc = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      oc, absl::nullopt, fmt::format("peekInt32 for opcode: {}", oc.status().message()));

  ENVOY_LOG(trace, "zookeeper_proxy: decoding request with opcode {} at offset {}", oc.value(),
            offset);

  const auto opcode = static_cast<OpCodes>(oc.value());
  switch (opcode) {
  case OpCodes::GetData:
    status = parseGetDataRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseGetDataRequest: {}", status.message()));
    break;
  case OpCodes::Create:
  case OpCodes::Create2:
  case OpCodes::CreateContainer:
  case OpCodes::CreateTtl:
    status = parseCreateRequest(data, offset, len.value(), opcode);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseCreateRequest: {}", status.message()));
    break;
  case OpCodes::SetData:
    status = parseSetRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("parseSetRequest: {}", status.message()));
    break;
  case OpCodes::GetChildren:
    status = parseGetChildrenRequest(data, offset, len.value(), false);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseGetChildrenRequest (get children): {}", status.message()));
    break;
  case OpCodes::GetChildren2:
    status = parseGetChildrenRequest(data, offset, len.value(), true);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseGetChildrenRequest (get children2): {}", status.message()));
    break;
  case OpCodes::Delete:
    status = parseDeleteRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseDeleteRequest: {}", status.message()));
    break;
  case OpCodes::Exists:
    status = parseExistsRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseExistsRequest: {}", status.message()));
    break;
  case OpCodes::GetAcl:
    status = parseGetAclRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseGetAclRequest: {}", status.message()));
    break;
  case OpCodes::SetAcl:
    status = parseSetAclRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseSetAclRequest: {}", status.message()));
    break;
  case OpCodes::Sync:
    status = callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len.value()), opcode);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("onSyncRequest: {}", status.message()));
    break;
  case OpCodes::Check:
    status = parseCheckRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("parseCheckRequest: {}", status.message()));
    break;
  case OpCodes::Multi:
    status = parseMultiRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("parseMultiRequest: {}", status.message()));
    break;
  case OpCodes::Reconfig:
    status = parseReconfigRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseReconfigRequest: {}", status.message()));
    break;
  case OpCodes::SetWatches:
    status = parseSetWatchesRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseSetWatchesRequest: {}", status.message()));
    break;
  case OpCodes::SetWatches2:
    status = parseSetWatches2Request(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseSetWatches2Request: {}", status.message()));
    break;
  case OpCodes::AddWatch:
    status = parseAddWatchRequest(data, offset, len.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseAddWatchRequest: {}", status.message()));
    break;
  case OpCodes::CheckWatches:
    status = parseXWatchesRequest(data, offset, len.value(), OpCodes::CheckWatches);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseXWatchesRequest (check watches): {}", status.message()));
    break;
  case OpCodes::RemoveWatches:
    status = parseXWatchesRequest(data, offset, len.value(), OpCodes::RemoveWatches);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseXWatchesRequest (remove watches): {}", status.message()));
    break;
  case OpCodes::GetEphemerals:
    status = callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len.value()), opcode);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("onGetEphemeralsRequest: {}", status.message()));
    break;
  case OpCodes::GetAllChildrenNumber:
    status = callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len.value()),
                                                      opcode);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("onGetAllChildrenNumberRequest: {}", status.message()));
    break;
  case OpCodes::Close:
    callbacks_.onCloseRequest();
    break;
  default:
    ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData failed: unknown opcode {}",
              enumToSignedInt(opcode));
    callbacks_.onDecodeError(absl::nullopt);
    return absl::nullopt;
  }

  requests_by_xid_[xid.value()] = {opcode, std::move(start_time)};

  return opcode;
}

absl::StatusOr<absl::optional<OpCodes>> DecoderImpl::decodeOnWrite(Buffer::Instance& data,
                                                                   uint64_t& offset) {
  ENVOY_LOG(trace, "zookeeper_proxy: decoding response with {} bytes at offset {}", data.length(),
            offset);

  // Check message length.
  const absl::StatusOr<int32_t> len = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      len, absl::nullopt, fmt::format("peekInt32 for len: {}", len.status().message()));

  ENVOY_LOG(trace, "zookeeper_proxy: decoding response with len.value() {} at offset {}",
            len.value(), offset);

  absl::Status status =
      ensureMinLength(len.value(), XID_LENGTH + ZXID_LENGTH + INT_LENGTH); // xid + zxid + err
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      status, absl::nullopt, fmt::format("ensureMinLength: {}", status.message()));

  status = ensureMaxLength(len.value());
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      status, absl::nullopt, fmt::format("ensureMaxLength: {}", status.message()));

  const absl::StatusOr<int32_t> xid = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      xid, absl::nullopt, fmt::format("peekInt32 for xid: {}", xid.status().message()));

  ENVOY_LOG(trace, "zookeeper_proxy: decoding response with xid {} at offset {}", xid.value(),
            offset);
  const auto xid_code = static_cast<XidCodes>(xid.value());

  absl::StatusOr<std::chrono::milliseconds> latency;
  OpCodes opcode;

  switch (xid_code) {
  case XidCodes::ConnectXid:
    ABSL_FALLTHROUGH_INTENDED;
  case XidCodes::PingXid:
    ABSL_FALLTHROUGH_INTENDED;
  case XidCodes::AuthXid:
    ABSL_FALLTHROUGH_INTENDED;
  case XidCodes::SetWatchesXid:
    latency = fetchControlRequestData(xid.value(), opcode);
    EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        latency, opcode, fmt::format("fetchControlRequestData: {}", latency.status().message()));
    break;
  case XidCodes::WatchXid:
    // WATCH_XID is generated by the server, no need to fetch opcode and latency here.
    break;
  default:
    latency = fetchDataRequestData(xid.value(), opcode);
    EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        latency, opcode, fmt::format("fetchDataRequestData: {}", latency.status().message()));
  }

  // Connect responses are special, they have no full reply header
  // but just an XID with no zxid nor error fields like the ones
  // available for all other server generated messages.
  if (xid_code == XidCodes::ConnectXid) {
    status = parseConnectResponse(data, offset, len.value(), latency.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
        status, fmt::format("parseConnectResponse: {}", status.message()))
    return opcode;
  }

  // Control responses that aren't connect, with XIDs <= 0.
  const absl::StatusOr<int64_t> zxid = helper_.peekInt64(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      zxid, opcode, fmt::format("peekInt64 for zxid: {}", zxid.status().message()));

  const absl::StatusOr<int32_t> error = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      error, opcode, fmt::format("peekInt32 for error: {}", error.status().message()));

  ENVOY_LOG(trace,
            "zookeeper_proxy: decoding response with zxid.value() {} and error {} at offset {}",
            zxid.value(), error.value(), offset);

  switch (xid_code) {
  case XidCodes::PingXid:
    callbacks_.onResponse(OpCodes::Ping, xid.value(), zxid.value(), error.value(), latency.value());
    return opcode;
  case XidCodes::AuthXid:
    callbacks_.onResponse(OpCodes::SetAuth, xid.value(), zxid.value(), error.value(),
                          latency.value());
    return opcode;
  case XidCodes::SetWatchesXid:
    callbacks_.onResponse(OpCodes::SetWatches, xid.value(), zxid.value(), error.value(),
                          latency.value());
    return opcode;
  case XidCodes::WatchXid:
    status = parseWatchEvent(data, offset, len.value(), zxid.value(), error.value());
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(status,
                                            fmt::format("parseWatchEvent: {}", status.message()));

    return absl::nullopt; // WATCH_XID is generated by the server, it has no corresponding opcode.
  default:
    break;
  }

  callbacks_.onResponse(opcode, xid.value(), zxid.value(), error.value(), latency.value());
  offset += (len.value() - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH));

  return opcode;
}

absl::Status DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const {
  if (len < minlen) {
    return absl::InvalidArgumentError("packet is too small");
  }
  return absl::OkStatus();
}

absl::Status DecoderImpl::ensureMaxLength(const int32_t len) const {
  if (static_cast<uint32_t>(len) > max_packet_bytes_) {
    return absl::InvalidArgumentError("packet is too big");
  }
  return absl::OkStatus();
}

absl::Status DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) {
  absl::Status status =
      ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect);

  // Skip zxid, timeout, and session id.
  offset += ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH;

  // Skip password.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect);

  const absl::StatusOr<bool> readonly = maybeReadBool(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, OpCodes::Connect,
                                                               readonly.status().message());

  callbacks_.onConnect(readonly.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) {
  absl::Status status =
      ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAuth);
  // Skip opcode + type.
  offset += OPCODE_LENGTH + INT_LENGTH;

  const absl::StatusOr<std::string> scheme = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(scheme, OpCodes::SetAuth,
                                                               scheme.status().message());

  // Skip credential.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAuth);

  callbacks_.onAuthRequest(scheme.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset,
                                              uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::GetData);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::GetData,
                                                               path.status().message());

  const absl::StatusOr<bool> watch = helper_.peekBool(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, OpCodes::GetData,
                                                               watch.status().message());

  callbacks_.onGetDataRequest(path.value(), watch.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) {
  const absl::StatusOr<int32_t> count = helper_.peekInt32(data, offset);
  RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(count,
                                          fmt::format("skipAcls: {}", count.status().message()));

  for (int i = 0; i < count.value(); ++i) {
    // Perms.
    absl::StatusOr<int32_t> perms = helper_.peekInt32(data, offset);
    RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(perms,
                                            fmt::format("skipAcls: {}", perms.status().message()));
    // Skip scheme.
    absl::Status status = skipString(data, offset);
    ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status);
    // Skip cred.
    status = skipString(data, offset);
    ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status);
  }

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len,
                                             OpCodes opcode) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (4 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, opcode,
                                                               path.status().message());

  // Skip data.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode);

  status = skipAcls(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode);

  absl::StatusOr<int32_t> flag_data = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(flag_data, opcode,
                                                               flag_data.status().message());

  const CreateFlags flags = static_cast<CreateFlags>(flag_data.value());
  status = callbacks_.onCreateRequest(path.value(), flags, opcode);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode);

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::SetData,
                                                               path.status().message());

  // Skip data.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData);

  // Ignore version.
  absl::StatusOr<int32_t> version = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::SetData,
                                                               version.status().message());

  callbacks_.onSetRequest(path.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset,
                                                  uint32_t len, const bool v2) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH);
  OpCodes opcode = OpCodes::GetChildren;
  if (v2) {
    opcode = OpCodes::GetChildren2;
  }
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, opcode,
                                                               path.status().message());

  const absl::StatusOr<bool> watch = helper_.peekBool(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, opcode,
                                                               watch.status().message());

  callbacks_.onGetChildrenRequest(path.value(), watch.value(), v2);

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset,
                                             uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Delete);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::Delete,
                                                               path.status().message());

  const absl::StatusOr<int32_t> version = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::Delete,
                                                               version.status().message());

  callbacks_.onDeleteRequest(path.value(), version.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset,
                                             uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Exists);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::Exists,
                                                               path.status().message());

  const absl::StatusOr<bool> watch = helper_.peekBool(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch, OpCodes::Exists,
                                                               watch.status().message());

  callbacks_.onExistsRequest(path.value(), watch.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset,
                                             uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::GetAcl);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::GetAcl,
                                                               path.status().message());

  callbacks_.onGetAclRequest(path.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset,
                                             uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAcl);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::SetAcl,
                                                               path.status().message());

  status = skipAcls(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetAcl);

  const absl::StatusOr<int32_t> version = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::SetAcl,
                                                               version.status().message());

  callbacks_.onSetAclRequest(path.value(), version.value());

  return absl::OkStatus();
}

absl::StatusOr<std::string> DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset,
                                                         uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH);
  RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
      status, fmt::format("zookeeper_proxy: pathOnlyRequest failed: {}", status.message()));

  return helper_.peekString(data, offset);
}

absl::Status DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset,
                                            uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Check);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::Check,
                                                               path.status().message());

  const absl::StatusOr<int32_t> version = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(version, OpCodes::Check,
                                                               version.status().message());

  callbacks_.onCheckRequest(path.value(), version.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& offset,
                                            uint32_t len) {
  // Treat empty transactions as a decoding error, there should be at least 1 header.
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Multi);

  while (true) {
    const absl::StatusOr<int32_t> opcode = helper_.peekInt32(data, offset);
    EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(opcode, OpCodes::Multi,
                                                                 opcode.status().message());

    const absl::StatusOr<bool> done = helper_.peekBool(data, offset);
    EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(done, OpCodes::Multi,
                                                                 done.status().message());

    // Ignore error field.
    const absl::StatusOr<int32_t> error = helper_.peekInt32(data, offset);
    EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(error, OpCodes::Multi,
                                                                 error.status().message());

    if (done.value()) {
      break;
    }

    switch (static_cast<OpCodes>(opcode.value())) {
    case OpCodes::Create:
      status = parseCreateRequest(data, offset, len, OpCodes::Create);
      EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Create);
      break;
    case OpCodes::SetData:
      status = parseSetRequest(data, offset, len);
      EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData);
      break;
    case OpCodes::Check:
      status = parseCheckRequest(data, offset, len);
      EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Check);
      break;
    case OpCodes::Delete:
      status = parseDeleteRequest(data, offset, len);
      EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Delete);
      break;
    default:
      callbacks_.onDecodeError(absl::nullopt);
      return absl::InvalidArgumentError(
          fmt::format("unknown opcode within a transaction: {}", opcode.value()));
    }
  }

  callbacks_.onMultiRequest();

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& offset,
                                               uint32_t len) {
  absl::Status status =
      ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig);

  // Skip joining.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig);

  // Skip leaving.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig);
  // Skip new members.
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Reconfig);

  // Read config id.
  absl::StatusOr<int64_t> config_id = helper_.peekInt64(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(config_id, OpCodes::Reconfig,
                                                               config_id.status().message());

  callbacks_.onReconfigRequest();

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset,
                                                 uint32_t len) {
  absl::Status status =
      ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (3 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches);

  // Ignore relative Zxid.
  absl::StatusOr<int64_t> zxid = helper_.peekInt64(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, OpCodes::SetWatches,
                                                               zxid.status().message());

  // Data watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches);

  // Exist watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches);

  // Child watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches);

  callbacks_.onSetWatchesRequest();

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseSetWatches2Request(Buffer::Instance& data, uint64_t& offset,
                                                  uint32_t len) {
  absl::Status status =
      ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + LONG_LENGTH + (5 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2);

  // Ignore relative Zxid.
  absl::StatusOr<int64_t> zxid = helper_.peekInt64(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(zxid, OpCodes::SetWatches2,
                                                               zxid.status().message());

  // Data watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2);

  // Exist watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2);

  // Child watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2);

  // Persistent watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2);

  // Persistent recursive watches.
  status = skipStrings(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetWatches2);

  callbacks_.onSetWatches2Request();

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseAddWatchRequest(Buffer::Instance& data, uint64_t& offset,
                                               uint32_t len) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::AddWatch);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, OpCodes::AddWatch,
                                                               path.status().message());

  const absl::StatusOr<int32_t> mode = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(mode, OpCodes::AddWatch,
                                                               mode.status().message());

  callbacks_.onAddWatchRequest(path.value(), mode.value());

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset,
                                               uint32_t len, OpCodes opcode) {
  absl::Status status = ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, opcode);

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, opcode,
                                                               path.status().message());

  const absl::StatusOr<int32_t> watch_type = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(watch_type, opcode,
                                                               watch_type.status().message());

  if (opcode == OpCodes::CheckWatches) {
    callbacks_.onCheckWatchesRequest(path.value(), watch_type.value());
  } else {
    callbacks_.onRemoveWatchesRequest(path.value(), watch_type.value());
  }

  return absl::OkStatus();
}

absl::Status DecoderImpl::skipString(Buffer::Instance& data, uint64_t& offset) {
  const absl::StatusOr<int32_t> slen = helper_.peekInt32(data, offset);
  RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(slen,
                                          fmt::format("skipString: {}", slen.status().message()));

  if (slen.value() < 0) {
    ENVOY_LOG(trace,
              "zookeeper_proxy: decoding response with negative string length {} at offset {}",
              slen.value(), offset);
    return absl::OkStatus();
  }

  helper_.skip(slen.value(), offset);

  return absl::OkStatus();
}

absl::Status DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) {
  const absl::StatusOr<int32_t> count = helper_.peekInt32(data, offset);
  RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(count,
                                          fmt::format("skipStrings: {}", count.status().message()));

  for (int i = 0; i < count.value(); ++i) {
    absl::Status status = skipString(data, offset);
    ABSL_STATUS_RETURN_IF_STATUS_NOT_OK(status);
  }

  return absl::OkStatus();
}

Network::FilterStatus DecoderImpl::onData(Buffer::Instance& data) {
  return decodeAndBuffer(data, DecodeType::READ, zk_filter_read_buffer_);
}

Network::FilterStatus DecoderImpl::onWrite(Buffer::Instance& data) {
  return decodeAndBuffer(data, DecodeType::WRITE, zk_filter_write_buffer_);
}

Network::FilterStatus DecoderImpl::decodeAndBuffer(Buffer::Instance& data, DecodeType dtype,
                                                   Buffer::OwnedImpl& zk_filter_buffer) {
  const uint32_t zk_filter_buffer_len = zk_filter_buffer.length();
  absl::Status status;

  if (zk_filter_buffer_len == 0) {
    status = decodeAndBufferHelper(data, dtype, zk_filter_buffer);
    if (!status.ok()) {
      ENVOY_LOG(debug, "zookeeper_proxy: decodeAndBufferHelper failed: {}", status.message());
    }

    return Network::FilterStatus::Continue;
  }

  // ZooKeeper filter buffer contains partial packet data from the previous network filter buffer.
  // Prepending ZooKeeper filter buffer to the current network filter buffer can help to generate
  // full packets.
  data.prepend(zk_filter_buffer);

  status = decodeAndBufferHelper(data, dtype, zk_filter_buffer);
  if (!status.ok()) {
    ENVOY_LOG(debug, "zookeeper_proxy: decodeAndBufferHelper failed: {}", status.message());
  }

  // Drain the prepended ZooKeeper filter buffer.
  data.drain(zk_filter_buffer_len);
  return Network::FilterStatus::Continue;
}

absl::Status DecoderImpl::decodeAndBufferHelper(Buffer::Instance& data, DecodeType dtype,
                                                Buffer::OwnedImpl& zk_filter_buffer) {
  ASSERT(dtype == DecodeType::READ || dtype == DecodeType::WRITE);

  const uint32_t data_len = data.length();
  uint64_t offset = 0;
  absl::StatusOr<int32_t> len = 0;
  absl::Status status;
  // Boolean to check whether there is at least one full packet in the network filter buffer (to
  // which the ZooKeeper filter buffer is prepended).
  bool has_full_packets = false;

  while (offset < data_len) {
    TRY_NEEDS_AUDIT {
      // Peek packet length.
      len = helper_.peekInt32(data, offset);
      EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
          len, absl::nullopt, fmt::format("peekInt32 for len: {}", len.status().message()));

      status = ensureMinLength(len.value(), dtype == DecodeType::READ
                                                ? XID_LENGTH + INT_LENGTH
                                                : XID_LENGTH + ZXID_LENGTH + INT_LENGTH);
      EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, absl::nullopt);

      status = ensureMaxLength(len.value());
      EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, absl::nullopt);

      offset += len.value();
      if (offset <= data_len) {
        has_full_packets = true;
      }
    }
    END_TRY catch (const EnvoyException& e) {
      IS_ENVOY_BUG(fmt::format("zookeeper_proxy: decodeAndBufferHelper failed: {}", e.what()));
      callbacks_.onDecodeError(absl::nullopt);
      return absl::OkStatus();
    }
  }

  if (offset == data_len) {
    decode(data, dtype, offset);
    return absl::OkStatus();
  }

  ASSERT(offset > data_len);
  std::string temp_data;

  if (has_full_packets) {
    offset -= INT_LENGTH + len.value();
    ASSERT(offset < data_len);
    // Decode full packets.
    // offset here represents the length of all full packets.
    decode(data, dtype, offset);

    // Copy out the rest of the data to the ZooKeeper filter buffer.
    temp_data.resize(data_len - offset);
    data.copyOut(offset, data_len - offset, temp_data.data());
    zk_filter_buffer.add(temp_data.data(), temp_data.length());
  } else {
    // Copy out all the data to the ZooKeeper filter buffer, since after prepending the ZooKeeper
    // filter buffer is drained by the prepend() method.
    temp_data.resize(data_len);
    data.copyOut(0, data_len, temp_data.data());
    zk_filter_buffer.add(temp_data.data(), temp_data.length());
  }

  return absl::OkStatus();
}

void DecoderImpl::decode(Buffer::Instance& data, DecodeType dtype, uint64_t full_packets_len) {
  uint64_t offset = 0;

  TRY_NEEDS_AUDIT {
    while (offset < full_packets_len) {
      // Reset the helper's cursor, to ensure the current message stays within the
      // allowed max length, even when it's different than the declared length
      // by the message.
      //
      // Note: we need to keep two cursors — offset and helper_'s internal one — because
      //       a buffer may contain multiple messages, so offset is global while helper_'s
      //       internal cursor gets reset for each individual message.
      helper_.reset();

      const uint64_t current = offset;
      absl::StatusOr<absl::optional<OpCodes>> opcode;
      switch (dtype) {
      case DecodeType::READ:
        opcode = decodeOnData(data, offset);
        if (opcode.ok()) {
          callbacks_.onRequestBytes(opcode.value(), offset - current);
          break;
        }
        ENVOY_LOG(debug, "zookeeper_proxy: decodeOnData failed: {}", opcode.status().message());
        return;
      case DecodeType::WRITE:
        opcode = decodeOnWrite(data, offset);
        if (opcode.ok()) {
          callbacks_.onResponseBytes(opcode.value(), offset - current);
          break;
        }
        ENVOY_LOG(debug, "zookeeper_proxy: decodeOnWrite failed: {}", opcode.status().message());
        return;
      }
    }
  }
  END_TRY catch (const EnvoyException& e) {
    IS_ENVOY_BUG(fmt::format("zookeeper_proxy: decode failed: {}", e.what()));
    callbacks_.onDecodeError(absl::nullopt);
  }
}

absl::Status DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset,
                                               uint32_t len,
                                               const std::chrono::milliseconds latency) {
  absl::Status status =
      ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect);

  const absl::StatusOr<int32_t> timeout = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(timeout, OpCodes::Connect,
                                                               timeout.status().message());

  // Skip session id + password.
  offset += SESSION_LENGTH;
  status = skipString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Connect);

  const absl::StatusOr<bool> readonly = maybeReadBool(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(readonly, OpCodes::Connect,
                                                               readonly.status().message());

  callbacks_.onConnectResponse(0, timeout.value(), readonly.value(), latency);

  return absl::OkStatus();
}

absl::Status DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset,
                                          const uint32_t len, const int64_t zxid,
                                          const int32_t error) {
  absl::Status status = ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH));
  EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, absl::nullopt);

  const absl::StatusOr<int32_t> event_type = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(event_type, absl::nullopt,
                                                               event_type.status().message());

  const absl::StatusOr<int32_t> client_state = helper_.peekInt32(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(client_state, absl::nullopt,
                                                               client_state.status().message());

  const absl::StatusOr<std::string> path = helper_.peekString(data, offset);
  EMIT_DECODER_ERR_AND_RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(path, absl::nullopt,
                                                               path.status().message());

  callbacks_.onWatchEvent(event_type.value(), client_state.value(), path.value(), zxid, error);

  return absl::OkStatus();
}

absl::StatusOr<bool> DecoderImpl::maybeReadBool(Buffer::Instance& data, uint64_t& offset) {
  if (data.length() >= offset + 1) {
    return helper_.peekBool(data, offset);
  }
  return false;
}

absl::StatusOr<std::chrono::milliseconds> DecoderImpl::fetchControlRequestData(const int32_t xid,
                                                                               OpCodes& opcode) {
  // Find the corresponding request queue for this XID.
  const auto it = control_requests_by_xid_.find(xid);

  // If this fails, it's either a server-side bug or a malformed packet.
  if (it == control_requests_by_xid_.end()) {
    return absl::InvalidArgumentError(fmt::format("control request xid {} not found", xid));
  }

  std::queue<RequestBegin>& rq_queue = it->second;
  if (rq_queue.empty()) {
    return absl::InvalidArgumentError(fmt::format("control request queue for {} is empty", xid));
  }

  std::chrono::milliseconds latency = std::chrono::duration_cast<std::chrono::milliseconds>(
      time_source_.monotonicTime() - rq_queue.front().start_time);
  opcode = rq_queue.front().opcode;
  rq_queue.pop();

  return latency;
}

absl::StatusOr<std::chrono::milliseconds> DecoderImpl::fetchDataRequestData(const int32_t xid,
                                                                            OpCodes& opcode) {
  // Find the corresponding request for this XID.
  const auto it = requests_by_xid_.find(xid);

  // If this fails, it's either a server-side bug or a malformed packet.
  if (it == requests_by_xid_.end()) {
    return absl::InvalidArgumentError(fmt::format("data request xid {} not found", xid));
  }

  std::chrono::milliseconds latency = std::chrono::duration_cast<std::chrono::milliseconds>(
      time_source_.monotonicTime() - it->second.start_time);
  opcode = it->second.opcode;
  requests_by_xid_.erase(it);

  return latency;
}

} // namespace ZooKeeperProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
